package com.bawei.persona.realtime.app.dwm;

import com.alibaba.fastjson.JSON;

import com.bawei.persona.realtime.bean.PaymentWideNew;
import com.bawei.persona.realtime.util.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 项目规划及管理
 * 上海大数据学院院长 ：孙丰朝
 * 上海物联网学院院长：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 */

/**

 * scan.startup.mode指定了读取kafka的位置，有几个选项：

group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.
earliest-offset: start from the earliest offset possible.
latest-offset: start from the latest offset.
timestamp: start from user-supplied timestamp for each partition.
specific-offsets: start from user-supplied specific offsets for each partition.

 */
public class PaymentWideSqlHuaXiangApp {
    public static void main(String[] args) throws Exception {
        String paymentWideSinkTopic = "dwm_payment_wide";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //创建tableEnv
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);



        //建立宽表的订单流
/*
        tableEnv.executeSql("create table dwd_order_wide(\n" +
                "  sku_num BIGINT,\n" +
                "  spu_name STRING,\n" +
                "  user_gender STRING,\n" +
                "  original_total_amount  DOUBLE,\n" +
                "  tm_name STRING,\n" +
                "  order_status STRING ,\n" +
                "  province_area_code STRING,\n" +
                "  sku_name STRING ,\n" +
                "  spu_id INT,\n" +
                "  email STRING,\n" +
                "  create_time STRING,\n" +
                "  user_age INT,\n" +
                "  order_price DOUBLE,\n" +
                "  sku_id BIGINT,\n" +
                "  detail_id BIGINT,\n" +
                "  coupon_reduce_amount DOUBLE,\n" +
                "  province_name STRING,\n" +
                "  tm_id BIGINT,\n" +
                "  province_id BIGINT,\n" +
                "  total_amount DOUBLE,\n" +
                "  user_id BIGINT,\n" +
                "  feight_fee DOUBLE,\n" +
                "   split_activity_amount DOUBLE,\n" +
                "   split_coupon_amount DOUBLE,\n" +
                "  province_iso_code STRING,\n" +
                "  activity_reduce_amount  DOUBLE,\n" +
                "  category3_name STRING,\n" +
                "  order_id BIGINT,\n" +
                "  category3_id BIGINT,\n" +
                "  province_3166_2_code STRING,\n" +
                "  split_total_amount DOUBLE,\n" +
                "  create_ts BIGINT ,\n" +
                "   ets AS TO_TIMESTAMP(FROM_UNIXTIME(create_ts )),\n" +
                "  WATERMARK FOR ets AS ets - INTERVAL '3' SECOND\n" +
                ")  with (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'dwm_order_wide',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "  'properties.group.id' = 'paymentwide_app_group',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")  ");

*/

        tableEnv.executeSql("create table dwd_order_wide( \n" +
                "                 sku_num BIGINT, \n" +
                "                 spu_name STRING, \n" +
                "                 user_gender STRING, \n" +
                "                 original_total_amount  DOUBLE, \n" +
                "                 tm_name STRING, \n" +
                "                 order_status STRING , \n" +
                "                 province_area_code STRING, \n" +
                "                 sku_name STRING , \n" +
                "                 spu_id INT, \n" +
                "                 email STRING, \n" +
                "                 create_time STRING, \n" +
                "                 user_age INT, \n" +
                "                 order_price DOUBLE, \n" +
                "                 sku_id BIGINT, \n" +
                "                 detail_id BIGINT, \n" +
                "                 coupon_reduce_amount DOUBLE, \n" +
                "                 province_name STRING, \n" +
                "                 tm_id BIGINT, \n" +
                "                 province_id BIGINT, \n" +
                "                 total_amount DOUBLE, \n" +
                "                 user_id BIGINT, \n" +
                "                 feight_fee DOUBLE, \n" +
                "                  split_activity_amount DOUBLE, \n" +
                "                  split_coupon_amount DOUBLE, \n" +
                "                 province_iso_code STRING, \n" +
                "                 activity_reduce_amount  DOUBLE, \n" +
                "                 category3_name STRING, \n" +
                "                 order_id BIGINT, \n" +
                "                 category3_id BIGINT, \n" +
                "                 province_3166_2_code STRING, \n" +
                "                 split_total_amount DOUBLE, \n" +
                "                 create_ts BIGINT , \n" +
                "                 category1_id String,\n" +
                "                 category2_id String,\n" +
                "                 category1_name String,\n" +
                "                 category2_name String,\n" +
                "                 carrier String,\n" +
                "                 carriername String,\n" +
                "                 user_gendertype String,\n" +
                "                 emailtype String,\n" +
                "                 yearbasetype String,\n" +
                "                 yearbasename String,\n" +
                "                  ets AS TO_TIMESTAMP(FROM_UNIXTIME(create_ts )), \n" +
                "                 WATERMARK FOR ets AS ets - INTERVAL '3' SECOND \n" +
                "                )  with ( \n" +
                "                 'connector' = 'kafka', \n" +
                "                 'topic' = 'dwm_order_wide', \n" +
                "                 'properties.bootstrap.servers' = 'hadoop102:9092', \n" +
                "                 'properties.group.id' = 'paymentwide_app_group', \n" +
                "                 'scan.startup.mode' = 'latest-offset', \n" +
                "                 'format' = 'json' \n" +
                "                ) ");

        TableResult tableResult = tableEnv.executeSql("select * from dwd_order_wide ");

        tableResult.print();


        //建立 支付流
        tableEnv.executeSql("create table dwd_payment_info(\n" +
                "callback_time string ,\n" +
                "payment_type string ,\n" +
                "out_trade_no string ,\n" +
                "create_time string ,\n" +
                "user_id  BIGINT ,\n" +
                "total_amount DOUBLE ,\n" +
                "subject STRING ,\n" +
                "trade_no STRING ,\n" +
                "id  BIGINT,\n" +
                "order_id BIGINT,\n" +
                " create_ts BIGINT , \n" +
                "   ets AS TO_TIMESTAMP(FROM_UNIXTIME(create_ts )),\n" +
                "  WATERMARK FOR ets AS ets - INTERVAL '3' SECOND \n" +
                ") with (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'dwd_payment_info',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "  'properties.group.id' = 'paymentwide_app_group',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ") ");

        //创建输出流
        /*

        tableEnv.executeSql("create table paymentwide(\n" +
                "  sku_num BIGINT,\n" +
                "  spu_name STRING,\n" +
                "  user_gender STRING,\n" +
                "  original_total_amount  DOUBLE,\n" +
                "  tm_name STRING,\n" +
                "  order_status STRING ,\n" +
                "  province_area_code STRING,\n" +
                "  sku_name STRING ,\n" +
                "  spu_id INT,\n" +
                "  order_create_time STRING,\n" +
                "  user_age INT,\n" +
                "  order_price DOUBLE,\n" +
                "  sku_id BIGINT,\n" +
                "  detail_id BIGINT,\n" +
                "  coupon_reduce_amount DOUBLE,\n" +
                "  province_name STRING,\n" +
                "  tm_id BIGINT,\n" +
                "  province_id BIGINT,\n" +
                "  total_amount DOUBLE,\n" +
                "  user_id BIGINT,\n" +
                "  feight_fee DOUBLE,\n" +
                "  province_iso_code STRING,\n" +
                "  activity_reduce_amount  DOUBLE,\n" +
                "  category3_name STRING,\n" +
                "  category3_id BIGINT,\n" +
                "  province_3166_2_code STRING,\n" +
                "  split_total_amount DOUBLE,\n" +
                "  order_id  BIGINT,\n" +
                "callback_time string ,\n" +
                "payment_type string ,\n" +
                "payment_create_time string ,\n" +
                "payment_total_amount DOUBLE ,\n" +
                "subject STRING ,\n" +
                "trade_no STRING ,\n" +
                "payment_id  BIGINT ,\n" +
                "order_create_ts BIGINT,\n" +
                "payment_create_ts BIGINT,\n" +
                "split_activity_amount DOUBLE,\n" +
                "split_coupon_amount DOUBLE,\n" +
                "out_trade_no string \n" +
                ")\n" +
                "with (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'dwm_payment_wide',\n" +
                "  'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "  'properties.group.id' = 'paymentwide_app_group',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ") ");\
                */


        tableEnv.executeSql("create table paymentwide( \n" +
                "                 sku_num BIGINT, \n" +
                "                 spu_name STRING, \n" +
                "                 user_gender STRING, \n" +
                "                 original_total_amount  DOUBLE, \n" +
                "                 tm_name STRING, \n" +
                "                 order_status STRING , \n" +
                "                 province_area_code STRING, \n" +
                "                 sku_name STRING , \n" +
                "                 spu_id INT, \n" +
                "                 order_create_time STRING, \n" +
                "                 user_age INT, \n" +
                "                 order_price DOUBLE, \n" +
                "                 sku_id BIGINT, \n" +
                "                 detail_id BIGINT, \n" +
                "                 coupon_reduce_amount DOUBLE, \n" +
                "                 province_name STRING, \n" +
                "                 tm_id BIGINT, \n" +
                "                 province_id BIGINT, \n" +
                "                 total_amount DOUBLE, \n" +
                "                 user_id BIGINT, \n" +
                "                 feight_fee DOUBLE, \n" +
                "                 province_iso_code STRING, \n" +
                "                 activity_reduce_amount  DOUBLE, \n" +
                "                 category3_name STRING, \n" +
                "                 category3_id BIGINT, \n" +
                "                 province_3166_2_code STRING, \n" +
                "                 split_total_amount DOUBLE, \n" +
                "                 order_id  BIGINT, \n" +
                "                 callback_time string , \n" +
                "                 payment_type string , \n" +
                "                 payment_create_time string , \n" +
                "                 payment_total_amount DOUBLE , \n" +
                "                 subject STRING , \n" +
                "                 trade_no STRING , \n" +
                "                 payment_id  BIGINT , \n" +
                "                 order_create_ts BIGINT, \n" +
                "                 payment_create_ts BIGINT, \n" +
                "                 split_activity_amount DOUBLE, \n" +
                "                 split_coupon_amount DOUBLE, \n" +
                "                 out_trade_no string,\n" +
                "                 category1_id String,\n" +
                "                 category2_id String,\n" +
                "                 category1_name String,\n" +
                "                 category2_name String,\n" +
                "                 carrier String,\n" +
                "                 carriername String,\n" +
                "                 user_gendertype String,\n" +
                "                 emailtype String,\n" +
                "                 yearbasetype String,\n" +
                "                 yearbasename String \n" +
                "                 ) \n" +
                "                 with ( \n" +
                "                 'connector' = 'kafka', \n" +
                "                 'topic' = 'dwm_payment_wide', \n" +
                "                 'properties.bootstrap.servers' = 'hadoop102:9092', \n" +
                "                 'properties.group.id' = 'paymentwide_app_group', \n" +
                "                 'scan.startup.mode' = 'latest-offset', \n" +
                "                 'format' = 'json' \n" +
                "                 ) ") ;

        /*


        tableEnv.executeSql("insert into paymentwide \n" +
                "select  dwd_order_wide.sku_num ,\n" +
                "dwd_order_wide.spu_name ,\n" +
                "dwd_order_wide.user_gender ,\n" +
                "dwd_order_wide.original_total_amount ,\n" +
                "dwd_order_wide.tm_name ,\n" +
                "dwd_order_wide.order_status ,\n" +
                "dwd_order_wide.province_area_code ,\n" +
                "dwd_order_wide.sku_name ,\n" +
                "dwd_order_wide.spu_id ,\n" +
                "dwd_order_wide.create_time ,\n" +
                "dwd_order_wide.user_age ,\n" +
                "dwd_order_wide.order_price ,\n" +
                "dwd_order_wide.sku_id ,\n" +
                "dwd_order_wide.detail_id ,\n" +
                "dwd_order_wide.coupon_reduce_amount ,\n" +
                "dwd_order_wide.province_name ,\n" +
                "dwd_order_wide.tm_id ,\n" +
                "dwd_order_wide.province_id ,\n" +
                "dwd_order_wide.total_amount ,\n" +
                "dwd_order_wide.user_id ,\n" +
                "dwd_order_wide.feight_fee ,\n" +
                "dwd_order_wide.province_iso_code ,\n" +
                "dwd_order_wide.activity_reduce_amount ,\n" +
                "dwd_order_wide.category3_name ,\n" +
                "dwd_order_wide.category3_id ,\n" +
                "dwd_order_wide.province_3166_2_code ,\n" +
                "dwd_order_wide.split_total_amount ,\n" +
                "dwd_order_wide.order_id ,\n" +
                " dwd_payment_info.callback_time ,\n" +
                "dwd_payment_info.payment_type ,\n" +
                "dwd_payment_info.create_time ,\n" +
                "dwd_payment_info.total_amount ,\n" +
                "dwd_payment_info.subject ,\n" +
                "dwd_payment_info.trade_no ,\n" +
                "dwd_payment_info.id ,\n" +
                "dwd_order_wide.create_ts,\n" +
                "dwd_payment_info.create_ts ,\n" +
                "dwd_order_wide.split_activity_amount ,\n" +
                "dwd_order_wide.split_coupon_amount ,\n" +
                "dwd_payment_info.out_trade_no \n" +
                "from  dwd_payment_info   inner join dwd_order_wide\n" +
                "on dwd_payment_info.order_id  = dwd_order_wide.order_id  \n" +
                "and dwd_payment_info.ets between dwd_order_wide.ets  and dwd_order_wide.ets +  INTERVAL '25' MINUTE ");

*/

       tableEnv.executeSql("insert into paymentwide  \n" +
               "                 select  dwd_order_wide.sku_num , \n" +
               "                 dwd_order_wide.spu_name , \n" +
               "                 dwd_order_wide.user_gender , \n" +
               "                 dwd_order_wide.original_total_amount , \n" +
               "                 dwd_order_wide.tm_name , \n" +
               "                 dwd_order_wide.order_status , \n" +
               "                 dwd_order_wide.province_area_code , \n" +
               "                 dwd_order_wide.sku_name , \n" +
               "                 dwd_order_wide.spu_id , \n" +
               "                 dwd_order_wide.create_time , \n" +
               "                 dwd_order_wide.user_age , \n" +
               "                 dwd_order_wide.order_price , \n" +
               "                 dwd_order_wide.sku_id , \n" +
               "                 dwd_order_wide.detail_id , \n" +
               "                 dwd_order_wide.coupon_reduce_amount , \n" +
               "                 dwd_order_wide.province_name , \n" +
               "                 dwd_order_wide.tm_id , \n" +
               "                 dwd_order_wide.province_id , \n" +
               "                 dwd_order_wide.total_amount , \n" +
               "                 dwd_order_wide.user_id , \n" +
               "                 dwd_order_wide.feight_fee , \n" +
               "                 dwd_order_wide.province_iso_code , \n" +
               "                 dwd_order_wide.activity_reduce_amount , \n" +
               "                 dwd_order_wide.category3_name , \n" +
               "                 dwd_order_wide.category3_id , \n" +
               "                 dwd_order_wide.province_3166_2_code , \n" +
               "                 dwd_order_wide.split_total_amount , \n" +
               "                 dwd_order_wide.order_id , \n" +
               "                  dwd_payment_info.callback_time , \n" +
               "                 dwd_payment_info.payment_type , \n" +
               "                 dwd_payment_info.create_time , \n" +
               "                 dwd_payment_info.total_amount , \n" +
               "                 dwd_payment_info.subject , \n" +
               "                 dwd_payment_info.trade_no , \n" +
               "                 dwd_payment_info.id , \n" +
               "                 dwd_order_wide.create_ts, \n" +
               "                 dwd_payment_info.create_ts , \n" +
               "                 dwd_order_wide.split_activity_amount , \n" +
               "                 dwd_order_wide.split_coupon_amount , \n" +
               "                 dwd_payment_info.out_trade_no  ,\n" +
               "                 dwd_order_wide.category1_id ,\n" +
               "                 dwd_order_wide.category2_id ,\n" +
               "                 dwd_order_wide.category1_name ,\n" +
               "                 dwd_order_wide.category2_name ,\n" +
               "                 dwd_order_wide.carrier ,\n" +
               "                 dwd_order_wide.carriername ,\n" +
               "                 dwd_order_wide.user_gendertype ,\n" +
               "                 dwd_order_wide.emailtype ,\n" +
               "                 dwd_order_wide.yearbasetype ,\n" +
               "                 dwd_order_wide.yearbasename \n" +
               "                 from  dwd_payment_info   inner join dwd_order_wide \n" +
               "                 on dwd_payment_info.order_id  = dwd_order_wide.order_id   \n" +
               "                 and dwd_payment_info.ets between dwd_order_wide.ets  and dwd_order_wide.ets +  INTERVAL '25' MINUTE");


        //输出
        Table table = tableEnv.sqlQuery("select * from paymentwide ");
        //转为流
        DataStream<PaymentWideNew> paymentWideDataStream = tableEnv.<PaymentWideNew>toAppendStream(table, PaymentWideNew.class);

//        DataStream<PaymentWide> paymentWideDataStream = tableEnv.<PaymentWide>toAppendStream(table, PaymentWide.class);

        paymentWideDataStream.print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
        //转化为json 字符 输出
        paymentWideDataStream
                .map(
                        x-> JSON.toJSONString(x)
                )
                .addSink(MyKafkaUtil.getKafkaSink(paymentWideSinkTopic));


        env.execute();



    }


}
